注意:Kakfa 0.8版本支持在2.3版本中标记为弃用。
1 接收器方式
这种方式通过执行器上的接收器从Kafka接收数据。详见receiver reliability
默认配置存在故障时丢失数据的风险,需要使用预写日志保存接收的数据。详见Deploying section
(1) 连接
1 | groupId = org.apache.spark |
(2) 编程
1 | import org.apache.spark.streaming.kafka._ |
也可以使用键值类和解码器创建,详见API docs
注意:
- 这种方式下主题分区与RDD分区不相关。增加接收的主题分区只是增加接收的线程数。
- 可以使用多个接收器
- 如果使用了预写日志, 数据已经和日志一同副本。因此可以设置存储等级为StorageLevel.MEMORY_AND_DISK_SER
(3) 部署
- 打包spark-streaming-kafka-0-8_2.11及其依赖
- 为spark-core_2.11和spark-streaming_2.11声明provided
2 直接方式
直接方式确保更强的端到端保证,周期性查询每个分区的最新偏移,并根据定义的偏移范围批量处理。
相较接收器方式,提供了以下便利:
简化并行机制
无需创建多个接收器并合并,自动实现Kafka分区和RDD分区一对一的映射。
存储效率
接收器方式对同一条记录实现了两次副本:Kakfa自身和预写日志,造成了效率上的损失。而直接方式直接利用Kafka的副本实现,无需预写日志。
刚好一次
接收器方式中偏移保存在Zookeeper中,存在因故障导致Spark Streaming和Zookeeper数据不一致而多次消费的情况。直接方式将偏移保存在检查点中,避免了不一致情形的发生,只需要保证输出的幂等或原子性事务。
注意:直接方式不会更新ZooKeeper中的偏移,因此基于ZooKeeper的监控工具不能显示进度,可以自行获取并更新偏移。
(1) 连接
1 | groupId = org.apache.spark |
(2) 编程
1 | import org.apache.spark.streaming.kafka._ |
也可以通过messageHandler创建以操作MessageAndMetadata。详见API docs.
注意:默认从最近的偏移消费,可以设置auto.offset.reset为smallest从最远的偏移消费。
可以使用以下方式访问偏移:
1 | // Hold a reference to the current offset ranges, so it can be used downstream |
注意:
- HasOffsetRanges类型捕获仅对directKafkaStream的第一个方法调用有效
- 分区映射关系仅在shuffle或repartition前有效
- spark.streaming.receiver. 配置仅对接收器方式有效,spark.streaming.kafka.对直接方式有效
(3) 部署
同上
参考资料
Spark Streaming + Kafka Integration Guide (Kafka broker version 0.8.2.1 or higher)